package com.nx.platform.es.mq.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;

/**
 * MQ消息监听器
 */
@Component
public class MQMessageListener implements MessageListenerConcurrently {

    private static Logger logger = LoggerFactory.getLogger(MQMessageListener.class);

    /**
     * 自动装载所有ConsumerHandler实现
     */
    @Resource
    private Map<String, ConsumerHandler> handlerMap;

    /**
     * 处理MQ消息
     *
     * @param msgList
     * @param consumeConcurrentlyContext
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList,
        ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt msgExt : msgList) {
            String body = new String(msgExt.getBody());
            String keys = msgExt.getKeys();
            String tags = msgExt.getTags();

            if (handlerMap.containsKey(tags) && handlerMap.get(tags) != null) {
                if (handlerMap.get(tags).consume(tags, keys, body)) {
                    logger.info("desc=consumeMessage consume success, tag={}, key={}, body{} ", tags, keys, body);
                } else {
                    // 消费失败，中间件会重复通知
                    logger.error("desc=consumeMessage consume fail, tag={}, key={}, body{} ", tags, keys, body);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            } else {
                logger.info("desc=cannot find consumer to consume msg={}" + msgExt.toString());
                if (logger.isDebugEnabled()) {
                    logger.debug("consumer handlerMapKey =" + JSON.toJSONString(handlerMap.keySet()));
                }
            }
        }
        // 消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
